ShardedQueue
Why you should use ShardedQueue
ShardedQueue
is currently the fastest concurrent collection which can be used
under highest concurrency and load
among most popular solutions, like concurrent-queue
-
see benchmarks in directory benches
and run them with
Installation
Example
use ;
use ShardedQueue;
/// How many threads can physically access [ShardedQueue]
/// simultaneously, needed for computing `shard_count`
let max_concurrent_thread_count = available_parallelism.unwrap.get;
let sharded_queue = new;
sharded_queue.push_back;
let item = sharded_queue.pop_front_or_spin_wait_item;
Why you may want to not use ShardedQueue
-
Unlike in other concurrent queues, FIFO order is not guaranteed. While it may seem that FIFO order is guaranteed, it is not, because there can be a situation, when multiple consumers or producers triggered long resize of very large shards, all but last, then passed enough time for resize to finish, then 1 consumer or producer triggers long resize of last shard, and all other threads start to consume or produce, and eventually start spinning on last shard, without guarantee which will acquire spin lock first, so we can't even guarantee that
ShardedQueue::pop_front_or_spin_wait_item
will acquire lock beforeShardedQueue::push_back
on first attempt -
ShardedQueue
doesn't track length, since length's increment/decrement logic may change depending on use case, as well as logic when it goes from 1 to 0 or reverse (in some cases, likeNonBlockingMutex
, we don't even add action to queue when count reaches 1, but run it immediately in same thread), or even negative (to optimize some hot paths, like in some schedulers, since it is cheaper to restore count to correct state than to enforce that it can not go negative in some schedulers) -
ShardedQueue
doesn't have many features, only necessary methodsShardedQueue::pop_front_or_spin_wait_item
andShardedQueue::push_back
are implemented
Benchmarks
ShardedQueue
outperforms other concurrent queues.
See benchmark logic in directory benches
and reproduce results by running
Benchmark name | Operation count per thread | Concurrent thread count | average_time |
---|---|---|---|
sharded_queue_push_and_pop_concurrently | 1_000 | 24 | 1.1344 ms |
concurrent_queue_push_and_pop_concurrently | 1_000 | 24 | 4.8130 ms |
crossbeam_queue_push_and_pop_concurrently | 1_000 | 24 | 5.3154 ms |
queue_mutex_push_and_pop_concurrently | 1_000 | 24 | 6.4846 ms |
sharded_queue_push_and_pop_concurrently | 10_000 | 24 | 8.1651 ms |
concurrent_queue_push_and_pop_concurrently | 10_000 | 24 | 44.660 ms |
crossbeam_queue_push_and_pop_concurrently | 10_000 | 24 | 49.234 ms |
queue_mutex_push_and_pop_concurrently | 10_000 | 24 | 69.207 ms |
sharded_queue_push_and_pop_concurrently | 100_000 | 24 | 77.167 ms |
concurrent_queue_push_and_pop_concurrently | 100_000 | 24 | 445.88 ms |
crossbeam_queue_push_and_pop_concurrently | 100_000 | 24 | 434.00 ms |
queue_mutex_push_and_pop_concurrently | 100_000 | 24 | 476.59 ms |
Design explanation
ShardedQueue
is designed to be used in some schedulers and NonBlockingMutex
as the most efficient collection under highest
concurrently and load
(concurrent stack can't outperform it, because, unlike queue, which
spreads pop
and push
contention between front
and back
,
stack pop
-s from back
and push
-es to back
,
which has double the contention over queue, while number of atomic increments
per pop
or push
is same as in queue)
ShardedQueue
uses array of protected by separate Mutex
-es queues(shards),
and atomically increments head_index
or tail_index
when pop
or push
happens,
and computes shard index for current operation by applying modulo operation to
head_index
or tail_index
Modulo operation is optimized, knowing that
x % 2^n == x & (2^n - 1)
, so, as long as count of queues(shards) is a power of two, we can compute modulo very efficiently using formula
operation_number % shard_count == operation_number & (shard_count - 1)
As long as count of queues(shards) is a power of two and
is greater than or equal to number of CPU-s,
and CPU-s spend ~same time in push
/pop
(time is ~same,
since it is amortized O(1)),
multiple CPU-s physically can't access same shards
simultaneously and we have best possible performance.
Synchronizing underlying non-concurrent queue costs only
- 1 additional atomic increment per
push
orpop
(incrementinghead_index
ortail_index
) - 1 additional
compare_and_swap
and 1 atomic store (uncontendedMutex
acquire and release) - 1 cheap bit operation(to get modulo)
- 1 get from queue(shard) list by index
Complex example
use ShardedQueue;
use UnsafeCell;
use ;
use PhantomData;
use ;
use ;
/// [NonBlockingMutex] is needed to run actions atomically without thread blocking, or context
/// switch, or spin lock contention, or rescheduling on some scheduler
///
/// Notice that it uses [ShardedQueue] which doesn't guarantee order of retrieval, hence
/// [NonBlockingMutex] doesn't guarantee order of execution too, even of already added
/// items
/// [Send], [Sync], and [MutexGuard] logic was taken from [std::sync::Mutex]
/// and [std::sync::MutexGuard]
///
/// these are the only places where `T: Send` matters; all other
/// functionality works fine on a single thread.
unsafe
unsafe
/// Code was mostly taken from [std::sync::MutexGuard], it is expected to protect [State]
/// from moving out of synchronized loop
// todo uncomment when this error is no longer actual
// negative trait bounds are not yet fully implemented; use marker types for now [E0658]
// impl<'captured_variables, 'non_blocking_mutex_ref, State: ?Sized> !Send
// for MutexGuard<'captured_variables, 'non_blocking_mutex_ref, State>
// {
// }
unsafe